package util;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import po.RedisInfoDetail;
import redis.clients.jedis.Client;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.Response;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPipeline;
import redis.clients.jedis.ShardedJedisPool;
import redis.clients.jedis.exceptions.JedisConnectionException;

public class ShardedJedisUtil {

	private static Logger LOGGER = LoggerFactory.getLogger(ShardedJedisUtil.class);

	private static int redisMaxActive; // 连接池的最大数据库连接数
	private static int redisMaxIdle; // 最大空闲数
	private static long redisMaxWait; // 最大等待连接时间
	private static boolean redisTestOnBorrow; // 在borrow一个jedis实例时，是否提前进行alidate操作；如果为true，则得到的jedis实例均是可用的

	// 切片连接池
	private static ShardedJedisPool shardedJedisPool;
	// EMAP属性缓存
	private static Map<String, String> redisPropCache = null;

	private static final int TIMEOUT = 5;
	private static final int EXPIRE = 5;
	private static final boolean debugMode = false; // 调试使用，发布到现场请设置为false
	private static Set<String> importRedisInfoProps = new HashSet<String>(); // 比较重要的redis信息属性

	static {
		if (redisPropCache == null) {
			// 初始化Redis属性缓存
			initRedisProp();

			redisMaxActive = ConvertUtil.intOf(redisPropCache.get("redis.maxActive"));
			redisMaxIdle = ConvertUtil.intOf(redisPropCache.get("redis.maxIdle"));
			redisMaxWait = ConvertUtil.longOf(redisPropCache.get("redis.maxWait"));
		}

		importRedisInfoProps.add("connected_clients"); // 连接的客户端数
		importRedisInfoProps.add("blocked_clients"); // 被阻塞的客户端数
		importRedisInfoProps.add("used_memory_human"); // 使用内存
		importRedisInfoProps.add("used_memory_rss_human"); // 系统给redis分配的内存（即常驻内存）
		importRedisInfoProps.add("used_memory_peak_human"); // 最大使用内存总量（峰值）

		importRedisInfoProps.add("total_connections_received"); // 自启动起连接过的客户端的总数量
		importRedisInfoProps.add("total_commands_processed"); // 自启动起运行命令的总数
		importRedisInfoProps.add("rejected_connections"); // 因为最大客户端连接书限制，而导致被拒绝连接的个数
		importRedisInfoProps.add("role"); // 角色
		importRedisInfoProps.add("connected_slaves"); // 连接的从库数
		for (int i = 0; i <= 15; i++) {
			importRedisInfoProps.add("db" + i);
		}

		// 初始化连接池
		init();
	}

	/**
	 * 初始化连接池
	 */
	private static void init() {
		if (shardedJedisPool != null) {
			return;
		}
		synchronized (ShardedJedisUtil.class) {
			if (shardedJedisPool == null) {

				JedisPoolConfig config = new JedisPoolConfig();
				config.setMaxTotal(redisMaxActive);
				config.setMaxIdle(redisMaxIdle);
				config.setMaxWaitMillis(redisMaxWait);
				config.setTestOnBorrow(redisTestOnBorrow);
				config.setTestWhileIdle(true);

				List<JedisShardInfo> jdsInfoList = new ArrayList<JedisShardInfo>();

				try {
					if (StringUtils.isNotBlank(redisPropCache.get("redis.ip")) && StringUtils.isNotBlank(redisPropCache.get("redis.port"))) {
						JedisShardInfo info = new JedisShardInfo(redisPropCache.get("redis.ip"), ConvertUtil.intOf(redisPropCache.get("redis.port")));
						info.setPassword(redisPropCache.get("redis.password"));

						// 指定db
						Class<? extends JedisShardInfo> clz = info.getClass();
						Field declaredField = clz.getDeclaredField("db");
						declaredField.setAccessible(true);
						declaredField.set(info, ConvertUtil.intOf(redisPropCache.get("redis.database")));

						jdsInfoList.add(info);
						LOGGER.info("-----------默认分片：" + redisPropCache.get("redis.ip") + ":" + redisPropCache.get("redis.port") + "/" + ConvertUtil.intOf(redisPropCache.get("redis.database")));
					}
				} catch (Exception e) {
					LOGGER.error("init error", e);
				}

				for (int i = 1; i <= 20; i++) {
					if (StringUtils.isNotBlank(redisPropCache.get("redis.ip" + i)) && StringUtils.isNotBlank(redisPropCache.get("redis.port" + i))) {
						JedisShardInfo info = new JedisShardInfo(redisPropCache.get("redis.ip" + i), ConvertUtil.intOf(redisPropCache.get("redis.port" + i)));
						info.setPassword(redisPropCache.get("redis.password" + i));
						jdsInfoList.add(info);
						LOGGER.info("-----------其他分片：" + redisPropCache.get("redis.ip" + i) + ":" + redisPropCache.get("redis.port" + i));
					}
				}

				shardedJedisPool = new ShardedJedisPool(config, jdsInfoList);
			}
		}
	}

	/**
	 * 简单的字符串写入，不用序列化
	 */
	public static void set(String key, String value) {
		debug("set", key);

		if (StringUtils.isEmpty(value)) {
			return;
		}
		ShardedJedis redis = getRedisClient();
		try {
			redis.set(key, value);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("set error:" + key, e);
		} finally {
			returnJedis(redis);
		}
	}

	/**
	 * 简单的字符串读取，不用序列化
	 */
	public static String get(String key) {
		debug("get", key);

		if (StringUtils.isEmpty(key)) {
			return "";
		}
		ShardedJedis redis = getRedisClient();
		try {
			return redis.get(key);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("get error:" + key, e);
		} finally {
			returnJedis(redis);
		}
		return "";
	}

	/**
	 * 删除已存在的键
	 */
	public static void del(String key) {
		debug("del", key);

		ShardedJedis redis = getRedisClient();
		try {
			redis.del(key);
		} catch (Exception e) {
			LOGGER.error("del error:" + key, e);
		} finally {
			returnJedis(redis);
		}
	}

	/**
	 * 设置键值超时
	 */
	public static Long expire(String key, int seconds) {
		debug("expire", key);

		if (StringUtils.isEmpty(key)) {
			return 0L;
		}
		ShardedJedis redis = getRedisClient();
		try {
			return redis.expire(key, seconds);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("expire error:" + key, e);
		} finally {
			returnJedis(redis);
		}
		return 0L;
	}

	/**
	 * 根据键获取整个map 使用该函数注意性能！！！！
	 */
	public static Map<String, String> hgetAll(String key) {
		debug("hgetAll", key);

		ShardedJedis redis = getRedisClient();
		try {
			return redis.hgetAll(key);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("hgetAll error:" + key, e);
		} finally {
			returnJedis(redis);
		}
		return null;
	}

	/**
	 * 获取map中的键值
	 */
	public static String hget(String key, String field) {
		debug("hget", key);

		ShardedJedis redis = getRedisClient();
		try {
			return redis.hget(key, field);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("hget error:" + key, e);
		} finally {
			returnJedis(redis);
		}
		return null;
	}

	/**
	 * 写入map中的单个键值
	 */
	public static void hset(String key, String field, String value) {
		debug("hset", key);

		if (null == value || null == field || null == key) {
			return;
		}
		ShardedJedis redis = getRedisClient();
		try {
			redis.hset(key, field, value);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("hset error:" + key, e);
		} finally {
			returnJedis(redis);
		}
	}

	/**
	 * 删除map中的一个或多个键
	 */
	public static void hdel(String key, String... fields) {
		debug("hdel", key);

		ShardedJedis redis = getRedisClient();
		try {
			redis.hdel(key, fields);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("hdel error:" + key, e);
		} finally {
			returnJedis(redis);
		}
	}

	/**
	 * 用于获取哈希表中字段的数量
	 */
	public static long hlen(String key) {
		debug("hlen", key);

		if (StringUtils.isEmpty(key)) {
			return 0;
		}
		ShardedJedis redis = getRedisClient();
		try {
			return redis.hlen(key);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("hlen error:" + key, e);
		} finally {
			returnJedis(redis);
		}
		return 0;
	}

	/**
	 * 移除有序集中的一个或多个成员,不存在的成员将被忽略
	 */
	public static void zrem(String key, String... members) {
		debug("zrem", key);

		ShardedJedis redis = getRedisClient();
		try {
			redis.zrem(key, members);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("zrem error:" + key, e);
		} finally {
			returnJedis(redis);
		}
	}

	/**
	 * 用于将一个或多个成员元素及其分数值加入到有序集当中
	 */
	public static void zadd(String key, double score, String member) {
		debug("zadd", key);

		ShardedJedis redis = getRedisClient();
		try {
			redis.zadd(key, score, member);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("zadd error:" + key, e);
		} finally {
			returnJedis(redis);
		}
	}

	/**
	 * 简单的字符串写入，不用序列化，可设置过期时间
	 */
	public static void setex(String key, String value, int seconds) {
		debug("setex", key);

		if (StringUtils.isEmpty(value)) {
			return;
		}
		ShardedJedis redis = getRedisClient();
		try {
			redis.setex(key, seconds, value);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("setex error:" + key, e);
		} finally {
			returnJedis(redis);
		}
	}

	/**
	 * 返回列表的长度，如果列表 key 不存在,则 key 被解释为一个空列表,返回 0
	 */
	public static Long llen(String key) {
		debug("llen", key);

		if (StringUtils.isEmpty(key)) {
			return null;
		}
		ShardedJedis redis = getRedisClient();
		try {
			return redis.llen(key);
		} catch (Exception e) {
			returnBrokenResource(redis);
			throw new RuntimeException("llen error:" + key, e);
		} finally {
			returnJedis(redis);
		}
	}

	/**
	 * 批量获取map数据
	 */
	public static Map<String, Map<String, String>> batchGetMapCache(List<String> list) {
		debug("batchGetMapCache", null);

		if (null == list || list.isEmpty()) {
			return null;
		}
		int len = list.size();
		Map<String, Map<String, String>> retmap = new HashMap<String, Map<String, String>>();
		List<Response<Map<String, String>>> resplist = new ArrayList<Response<Map<String, String>>>();
		ShardedJedis redis = getRedisClient();
		try {
			ShardedJedisPipeline pl = redis.pipelined();
			for (int i = 0; i < len; i++) {
				resplist.add(pl.hgetAll(list.get(i)));
			}
			pl.sync();
			for (int i = 0; i < len; i++) {
				Response<Map<String, String>> resp = (Response<Map<String, String>>) resplist.get(i);
				retmap.put(list.get(i), resp.get());
			}
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("batchGetMapCache error.", e);
		} finally {
			returnJedis(redis);
		}
		return retmap;
	}

	/**
	 * 检查键是否存在
	 */
	public static boolean exists(String key) {
		debug("exists", key);

		if (StringUtils.isEmpty(key)) {
			return false;
		}
		ShardedJedis redis = getRedisClient();
		try {
			return redis.exists(key);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("exists error:" + key, e);
		} finally {
			returnJedis(redis);
		}
		return false;
	}

	/**
	 * 将一个或多个值插入到列表头部
	 */
	public static Long lpush(String key, String... strings) {
		debug("lpush", key);

		if (StringUtils.isEmpty(key)) {
			return 0L;
		}
		ShardedJedis redis = getRedisClient();
		try {
			return redis.lpush(key, strings);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("lpush error:" + key, e);
		} finally {
			returnJedis(redis);
		}
		return 0L;
	}

	/**
	 * 移出并获取列表的最后一个元素， 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
	 */
	public static List<String> brpop(int timeout, String key) {
		debug("brpop", key);

		List<String> content = new ArrayList<String>();
		ShardedJedis redis = getRedisClient();
		try {
			content = redis.brpop(timeout, key);
		} catch (Exception e) {
			e.printStackTrace();
			returnBrokenResource(redis);
		} finally {
			returnJedis(redis);
		}
		return content;
	}

	/**
	 * 移出并获取列表的最后一个元素， 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
	 */
	public static String rpop(String key) {
		debug("rpop", key);

		String content = "";
		ShardedJedis redis = getRedisClient();
		try {
			content = redis.rpop(key);
		} catch (Exception e) {
			e.printStackTrace();
			returnBrokenResource(redis);
		} finally {
			returnJedis(redis);
		}
		return content;
	}

	/**
	 * 移除列表的最后一个元素，返回值为移除的元素 (max表示移除个数)
	 */
	public static List<String> rpop(String key, int max) {
		debug("rpop", key);

		List<String> valList = new ArrayList<String>();
		if (StringUtils.isEmpty(key)) {
			return valList;
		}
		ShardedJedis redis = getRedisClient();
		try {
			String val = null;
			for (int i = 0; i < max; i++) {
				val = redis.rpop(key);
				if (val != null) {
					valList.add(val);
				} else {
					break;
				}
			}
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("rpop error:" + key, e);
		} finally {
			returnJedis(redis);
		}
		return valList;
	}

	/**
	 * 加锁
	 */
	public static boolean lock(String key) {
		return lock(key, TIMEOUT, EXPIRE);
	}

	/**
	 * 加锁
	 * 
	 * @param key
	 *            键值
	 * @param timeout
	 *            timeout的时间范围内轮询锁
	 * @param expire
	 *            设置锁超时时间
	 * @return 成功 or 失败
	 */
	public static boolean lock(String key, long timeout, int expire) {
		debug("lock", key);

		if (StringUtils.isEmpty(key)) {
			return false;
		}
		long nanoTime = System.nanoTime();
		timeout *= 1000000000;
		ShardedJedis redis = getRedisClient();
		try {
			// 在timeout的时间范围内不断轮询锁
			while (System.nanoTime() - nanoTime < timeout) {
				// 锁不存在的话，设置锁并设置锁过期时间，即加锁
				if (redis.setnx(key, key) == 1) {
					redis.expire(key, expire);
					return true;
				}
				Thread.sleep(200);
			}
		} catch (Exception e) {
			returnBrokenResource(redis);
			throw new RuntimeException("locking error:" + key, e);
		} finally {
			returnJedis(redis);
		}
		return false;
	}

	/**
	 * 解锁
	 */
	public static boolean unlock(String key) {
		debug("unlock", key);

		if (StringUtils.isEmpty(key)) {
			return false;
		}
		ShardedJedis redis = getRedisClient();
		try {
			redis.del(key);
			return true;
		} catch (Exception e) {
			returnBrokenResource(redis);
			throw new RuntimeException("unlocking error:" + key, e);
		} finally {
			returnJedis(redis);
		}
	}

	/**
	 * 发布消息
	 */
	public static boolean publish(String channel, String message) {
		debug("publish", channel);

		ShardedJedis redis = getRedisClient();
		try {
			Jedis[] jedisArray = new Jedis[] {};
			jedisArray = redis.getAllShards().toArray(jedisArray);
			Jedis jedis = jedisArray[0];

			jedis.publish(channel, message);
		} catch (Exception e) {
			LOGGER.error("publish error", e);
			returnBrokenResource(redis);
			return false;
		} finally {
			returnJedis(redis);
		}
		return true;
	}

	/**
	 * 接收消息。在main方法调用后，会一直执行下去。当有发布对应消息时，就会在jedisPubSub中接收到！
	 */
	public static void subscribe(JedisPubSub jedisPubSub, String channels) {
		debug("subscribe", channels);

		ShardedJedis redis = getRedisClient();
		try {
			Jedis[] jedisArray = new Jedis[] {};
			jedisArray = redis.getAllShards().toArray(jedisArray);
			Jedis jedis = jedisArray[0];

			jedis.subscribe(jedisPubSub, channels);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("subscribe error", e);
		} finally {
			returnJedis(redis);
		}
	}

	/**
	 * 返回有序集中指定分数区间内的所有的成员
	 */
	public static Set<String> zrevrangeByScore(String key, double max, double min) {
		debug("zrevrangeByScore", key);

		ShardedJedis redis = getRedisClient();
		try {
			return redis.zrevrangeByScore(key, max, min);
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("zrevrangeByScore error.", e);
		} finally {
			returnJedis(redis);
		}
		return new HashSet<String>();
	}

	/**
	 * 根据前缀删除key
	 */
	public static void delByPrefix(String prefix) {
		if (StringUtils.isBlank(prefix)) {
			return;
		}
		debug("delByPrefix", prefix);

		ShardedJedis redis = getRedisClient();
		try {
			Jedis[] jedisArray = new Jedis[] {};
			jedisArray = redis.getAllShards().toArray(jedisArray);
			Jedis jedis = jedisArray[0];

			Set<String> set = jedis.keys(prefix + "*");
			Iterator<String> it = set.iterator();
			while (it.hasNext()) {
				String keyStr = it.next();
				jedis.del(keyStr);
			}
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("delByPrefix error.", e);
		} finally {
			returnJedis(redis);
		}
	}

	/**
	 * 清空当前DB
	 */
	public static void flushDB() {
		debug("flushDB", null);

		ShardedJedis redis = getRedisClient();
		try {
			for (Jedis jedis2 : redis.getAllShards()) {
				jedis2.flushDB();
			}
			LOGGER.info("redis flushDB success.");
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("redis flushDB error.", e);
		} finally {
			returnJedis(redis);
		}
	}

	/**
	 * 获取redis 服务器信息
	 */
	public static Map<String, List<RedisInfoDetail>> getRedisInfo() {
		debug("getRedisInfo", null);

		Map<String, List<RedisInfoDetail>> infoMap = new LinkedHashMap<String, List<RedisInfoDetail>>();
		ShardedJedis redis = getRedisClient();
		try {
			for (Jedis jedis : redis.getAllShards()) {
				List<RedisInfoDetail> ridList = new ArrayList<RedisInfoDetail>();

				String info = jedis.info();
				String[] strs = info.split("\n");
				if (strs != null && strs.length > 0) {
					for (int i = 0; i < strs.length; i++) {
						String s = strs[i];
						String[] str = s.split(":");
						if (str != null && str.length > 1) {
							if (importRedisInfoProps.contains(str[0])) {
								ridList.add(new RedisInfoDetail(str[0], str[1]));
							}
						}
					}
				}

				// 最大连接数
				List<String> maxclients = jedis.configGet("maxclients");
				if (CollectionUtils.isNotEmpty(maxclients) && maxclients.size() == 2) {
					ridList.add(new RedisInfoDetail(maxclients.get(0), maxclients.get(1)));
				}

				Client client = jedis.getClient();
				LOGGER.info(jedis.configGet("maxclients").get(1));
				infoMap.put(client.getHost() + ":" + client.getPort(), ridList);
			}
		} catch (Exception e) {
			returnBrokenResource(redis);
			LOGGER.error("getRedisInfo error.", e);
		} finally {
			returnJedis(redis);
		}
		return infoMap;
	}

	@SuppressWarnings("deprecation")
	private static void returnJedis(ShardedJedis redis) {
		try {
			shardedJedisPool.returnResource(redis);
		} catch (Exception e) {
			LOGGER.info(e.getMessage(), e);
		}
	}

	@SuppressWarnings("deprecation")
	private static void returnBrokenResource(ShardedJedis redis) {
		try {
			shardedJedisPool.returnBrokenResource(redis);
		} catch (Exception e) {
			LOGGER.info(e.getMessage(), e);
		}
	}

	public static Map<String, Object> getShardedJedisConfigInfo() {
		Map<String, Object> info = new HashMap<String, Object>();
		info.put("maxActive", redisMaxActive);
		info.put("maxIdle", redisMaxIdle);
		info.put("maxWait", redisMaxWait);
		return info;
	}

	public static ShardedJedisPool getShardedJedisPool() {
		return shardedJedisPool;
	}

	private static ShardedJedis getRedisClient() {
		if (null == shardedJedisPool) {
			init();
		}
		int timeoutCount = 0;
		while (true) {
			try {
				return shardedJedisPool.getResource();
			} catch (Exception e) {
				if (e instanceof JedisConnectionException) {
					timeoutCount++;
					if (timeoutCount > 3) {
						break;
					}
				} else {
					System.out.println("jedisInfo ... NumActive=" + shardedJedisPool.getNumActive() + ", NumIdle=" + shardedJedisPool.getNumIdle() + ", NumWaiters=" + shardedJedisPool.getNumWaiters()
							+ ", isClosed=" + shardedJedisPool.isClosed());
					System.out.println("shardedJedisPool.getResource error,");
					break;
				}
			}
			break;
		}
		return null;
	}

	/**
	 * 初始化EMAP属性缓存
	 */
	private static void initRedisProp() {
		Properties env = new Properties();
		redisPropCache = new HashMap<String, String>();

		InputStream in = null;
		try {
			in = new BufferedInputStream(new FileInputStream(Thread.currentThread().getContextClassLoader().getResource("").getPath() + "/redis.properties"));
		} catch (FileNotFoundException e1) {
			e1.printStackTrace();
		}
		try {
			env.load(in);
		} catch (IOException e) {
			LOGGER.error("initRedisProp error", e);
		} finally {
			try {
				if (in != null) {
					in.close();
				}
			} catch (IOException e) {
				LOGGER.error("initRedisProp error", e);
			}
		}
		for (String key : env.stringPropertyNames()) {
			redisPropCache.put(key, env.getProperty(key));
		}
	}

	private static void debug(String msg, String key) {
		if (debugMode) {
			LOGGER.info("--------->>>" + msg + ":" + ConvertUtil.strOf(key));
		}
	}
}